多线程(七)--- 三种方式实现生产者-消费者

生产者-消费者模式是一个十分经典的多线程并发协作的模式,弄懂生产者-消费者问题能够让我们对并发编程的理解加深。所谓生产者-消费者问题,实际上主要是包含了两类线程,一种是生产者线程用于生产数据,另一种是消费者线程用于消费数据,为了解耦生产者和消费者的关系,通常会采用共享的数据区域,就像是一个仓库,生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为;而消费者只需要从共享数据区中去获取数据,就不再需要关心生产者的行为。但是,这个共享数据区域中应该具备这样的线程间并发协作的功能:

  1. 如果共享数据区已满的话,阻塞生产者继续生产数据放置入内;
  2. 如果共享数据区为空的话,阻塞消费者继续消费数据;

在实现生产者消费者问题时,可以采用三种方式:

1.使用 Object 的 wait/notify 的消息通知机制;

2.使用 Lock 的 Condition 的 await/signal 的消息通知机制;

3.使用 BlockingQueue 实现。本文主要将这三种实现方式进行总结归纳。

1. wait/notify 机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package 多线程.Producer_Consumer;


import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* 我有三个问题:
* 1. 为何每次都是 producer 和 consumer 连着一起?是 jvm 内置偏向锁的原因吗?
* 2. 为何 condition.await() 那块用 if 会报错,但是用 while 就可以?
* 3. 按道理来说,consumer 和 producer 不应该有资源竞争关系,所以合理来说应该是两把锁,如果 consumer 和 producer 分两把锁来操作又如何完成通信呢?
*
*/
public class wait_notify {
private final static Object a = 1;
private final static LinkedList<Long> linkedList = new LinkedList<>();
private final static int MAX_CAPACITY = 10;

public static void main(String[] args) {
System.out.println("wait/notify");

for (int i = 0; i < 3; i++) {
new Thread(() -> {
while (true) {
try {
wait_notify.producer();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
System.out.println("-----------------");

for (int i = 0; i < 3; i++) {
new Thread(() -> {
while (true) {
try {
wait_notify.consumer();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}

}

static void producer() throws InterruptedException {
synchronized (a){
while(linkedList.size() == MAX_CAPACITY){
a.wait();
}
long value = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ":-------PRODUCER-------- " + value);
linkedList.addLast(value);
a.notifyAll();

}
}

static void consumer() throws InterruptedException {
synchronized (a){
while (linkedList.size() == 0){
a.wait();
}
long value = linkedList.pop();
System.out.println(Thread.currentThread().getName() + "::-------CONSUMER-------- " + value);
a.notifyAll();
}

}
}

2. await/signal 机制

这个我在之前有写过。不过其实我觉得,producer 和 consumer 应该两把锁会更加合适一些,生产者和消费者不应该有资源冲突。而这刚好就是 ArrayBlockingQueue 和 LinkedBlockingQueue 的区别,前者是生产者和消费者共一把独占锁,但是后者是分开,两把锁,这样并发的效率也就越高,线程等待的机会会更少。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package 多线程.Producer_Consumer;


import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class await_signal {
private final static Lock lock = new ReentrantLock();
private final static Condition producer_condition = lock.newCondition();
private final static Condition consumer_condition = lock.newCondition();
private final static LinkedList<Long> linkedList = new LinkedList<>();
private final static int MAX_CAPACITY = 100;

public static void main(String[] args) {
new Thread(() -> {
for (int i = 0; i < 10; i++) {
while (true) {
await_signal.producer();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
new Thread(() -> {
for (int i = 0; i < 14; i++) {
while (true) {
await_signal.consumer();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();

}

private static void producer() {
try {
lock.lock();
if (linkedList.size() > MAX_CAPACITY) {
producer_condition.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
long value = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ":-------PRODUCER-------- " + value);
linkedList.addLast(value);
consumer_condition.signalAll();
lock.unlock();
}
}

private static void consumer() {
try {
lock.lock();
if (linkedList.size() == 0) {
consumer_condition.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
long value = linkedList.pop();
System.out.println(Thread.currentThread().getName() + "::-------CONSUMER-------- " + value);
producer_condition.signalAll();
lock.unlock();
}
}
}

上面的代码写的有点问题,为了保证思考的过程呈现,我就不删除了:

  1. 首先是主程序 main 中创建线程写错了…这样只会创建两个线程…低级失误;

  2. 在 await 时,用 if 会报错,但是用 while 没问题。

然后写完之后我发现输出很奇怪,并且又产生了新的疑问:

  1. 为何每次都是 producer 和 consumer 连着一起?
  2. 为何 await 那块用 if 会报错,但是用 while 就可以?
  3. 按道理来说,consumer 和 producer 不应该有资源竞争关系,所以合理来说应该是两把锁,如果 consumer 和 producer 分两把锁来操作又如何完成呢?

为了更好地解决以上几个疑问,我简化了一下例子,将 producer 的线程降为 1 个,将 consumer 的线程 降为 3 个。

Q1:为何每次都是 producer 和 consumer 连着一起?

经过多次测试发现,不会总是 producer 和 consumer 连在一起出现,只是最开始的时候会连在一起出现,为什么呢?因为在最开始的时候,producer是先启动的,那么在同步队列中排队,producer是在前面的,所以率先抢到锁,consumer等producer抢完之后才开始抢锁消费,但是后期由于偏向锁和Condition的原因,就不会 producer连着出现的情况了。

Q2:为何 await 那块用 if 会报错,但是用 while 就可以?

为了解决这个问题,我特意将 producer 的线程降为 1 个,将 consumer 的线程 降为 3 个,结果是依然会报错。

新的 demo 如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package 多线程.Producer_Consumer;


import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* 我有三个问题:
* 1. 为何每次都是 producer 和 consumer 连着一起?是 jvm 内置偏向锁的原因吗?
* 2. 为何 condition.await() 那块用 if 会报错,但是用 while 就可以?
* 3. 按道理来说,consumer 和 producer 不应该有资源竞争关系,所以合理来说应该是两把锁,如果 consumer 和 producer 分两把锁来操作又如何完成通信呢?
*
*/
public class await_signal {
private final static Lock lock = new ReentrantLock();
private final static Condition producer_condition = lock.newCondition();
private final static Condition consumer_condition = lock.newCondition();
private final static LinkedList<Long> linkedList = new LinkedList<>();
private final static int MAX_CAPACITY = 10;

public static void main(String[] args) {
System.out.println("producer-consumer 同一把锁");

for (int i = 0; i < 1; i++) {
new Thread(() -> {
while (true) {
await_signal.producer();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
System.out.println("-----------------");

for (int i = 0; i < 3; i++) {
new Thread(() -> {
while (true) {
await_signal.consumer();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}

}

static void producer() {
try {
lock.lock();
// 为何这一块用 if 不行?
if (linkedList.size() == MAX_CAPACITY) {
producer_condition.await();
}
long value = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ":-------PRODUCER-------- " + value);
linkedList.addLast(value);
consumer_condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {

lock.unlock();
}
}

static void consumer() {
try {
lock.lock();
// 为何这一块用 if 不行?
if (linkedList.size() == 0) {
consumer_condition.await();
}
long value = linkedList.pop();
System.out.println(Thread.currentThread().getName() + "::-------CONSUMER-------- " + value);
producer_condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

如上,我们来分析一下为何要用 while,下面我把程序整个运行过程讲一遍。

  1. 首先我们清楚 Thread-0 是 producer,Thread 1 、2、3 都是 consumer;
  2. 在最开始, Thread-0 最先启动,所以它也理所当然的第一个抢到了锁
    image-20200301211615114

image-20200301212112734

从图中我们也可以看出,此时抢到锁的 exclusiveOwnerThread 为 Thread 0,从队列中我们可以看出剩下的三个 Thread 已经全部进入到同步队列,且顺序是 Thread 3 - Thread 1 - Thread 2(后面那个 5 是指优先级,不用管),为什么不可能进入到条件队列呢?因为 nextWaiter = null,且从 waitStatus = -1可知,三个线程都在同步队列中等待被唤醒。所以我们预测下一个抢到锁的一定是 Thread 3,他会去消费 Thread 0 刚刚生产出的消息,让我们接着看;

  1. 正如我们所料,真的是 Thread 3抢到了锁然后去消费了。
1
2
3
4
producer-consumer 同一把锁
-----------------
Thread-0:-------PRODUCER-------- 1583068539426
Thread-3::-------CONSUMER-------- 1583068539426

我们再通过 debug 的变量验证一下。

image-20200301213007786

看,真的 Thread 3 抢到锁了。不信,继续看下面更详细的信息:

image-20200301213437925

如图我们可以看到,Thread 3 消费掉了消息,并且注意到同步队列现在变为了 Thread 1 - Thread 2 - Thread 0,开始是 3 - 1 - 2,现在就变成了 1 - 2 - 0,你还别说,这种同步真的速度有点慢,Thread 0 都休息完了 3 秒了,前面那两老哥还在等着呢,不过这里还有一个要注意的地方,就是 Thread 0 的 waitStatus = 0,这是为啥呢,凭啥别人都是 waitStatus = -1呢,回顾一下,watiStatus = -1意思就是等待被唤醒,waitStatus = 0 意思就是下个节点是 null,当然也可能是下个节点还没初始化完,但是这里是 null。

这下总该肯定是 Thread 3 消费了吧,我们接着往下看。当然了,我们先预测一波,下面要抢锁的肯定是 1号老大哥,然后此时已经没有消息给他消费了,那他就只能掉入 Condition 的大坑了,只能从同步队列移到条件队列了,好苦逼啊,等了这么久,还得换个地方接着等!

  1. 我们来看看,结果是不是如我们预料的一样。

image-20200301214231056

果然,接着就是 1 号老大哥在运行,不信我们看 debug 详细的参数:

image-20200301215139371

握草,我被打脸了…虽然此时的确是 Thread 1 在执行,但是!!!玛德条件队列竟然不是 1号老大哥,是刚刚消费消息的 3 号小老弟,它在搞什么??????仔细思考一下,我们得知道 jvm 默认开启了偏向锁,至于什么是偏向锁,那就回到多线程(一)去复习吧,3 号小老弟使用了外挂—偏向锁「错了!!!!纠正一下,压根不是用的偏向锁,这里早就升级到了重量级锁,哪里来的偏向锁,真正插队的原因是这里是非公平锁!!!!!」,于是它很恶心的插队了,本来按道理来说,应该是 1号老哥抢到锁,然后因为没有消息可以消费,所以它从同步队列移入到条件队列中等到被 signal,然后同步队列理应就剩下 2 - 0,然后可能 3 号执行完了接在后面。但是现实是什么呢?那就是 3 号小老弟使用外挂偏向锁,直接插队,率先获取锁,由于没有消息可以消费,所以它从同步队列移入到了条件队列,开始等待,我们也可以看到,图中 consumer_condition的队列中只有一个 Thread,那就是 3 号小老弟,3号小老弟在条件队列中等待之后,暂时放弃了锁,于是终于尼玛轮到我 1 号老大哥了,我们可以看到,此时的确是 1号开始运行,然后同步队列中是 2 - 0 - null,即两个队列,这的确是符合我们现在的想法。

虽然刚才预测错了,但是那是因为 3 号使用了外挂,所以我还是接着预测吧,毕竟我预测的其实还是挺准的。此时因为没有消息可以消费,所以 1 号老大哥很可怜,跟 3 号小弟一样的下场,从同步队列转移到条件队列队尾继续等待,按道理来说 1 号将要在条件队列中,也就是 3 - 1,然后他让出锁,等待被唤醒,此时同步队列中队首应该是 2 号老哥了,终于轮到人家了…当然它应该也是很不幸的,只能跟 3 - 1一样的下场,回到条件队列队尾继续待着…预测完了,我们来看看真实情况。

  1. 看结果的确是 2 号老哥在运行着:

image-20200301220439196

我们来看一下具体的参数,验证一下:

image-20200301220935496

完美,预测的一模一样。2 号抢到锁之后,同步队列就只剩下了 0 号,并且 waitStatus = 0,说明这是队尾了,再提醒一下哈,当前节点的 waitStatus 的值是表示下一节点的状态,比如说 waitStatus = -1说明的是该节点的下一个节点已经准备好被唤醒了。然后我们再看一下条件队列的情况,情况和我们思考的是一样的,条件队列中现在就 2 个 Thread,分别是 3 - 1,当然了,马上 2 就要加入他们了,因为此时还是没有消息可以消费,我们接着往下看。

  1. 啊啊啊啊,不知道为啥,可能是我点错了,突然就出现了这样的结果。 并且条件队列为空。
1
2
3
4
5
6
7
producer-consumer 同一把锁
-----------------
Thread-0:-------PRODUCER-------- 1583068539426
Thread-3::-------CONSUMER-------- 1583068539426
Thread-0:-------PRODUCER-------- 1583068539427
Thread-3::-------CONSUMER-------- 1583068539427
Exception in thread "Thread-1"

不要着急,我们来分析一波,用理论战胜错误。我们刚才是这样的情况,输出框是

1
2
3
4
producer-consumer 同一把锁
-----------------
Thread-0:-------PRODUCER-------- 1583068539426
Thread-3::-------CONSUMER-------- 1583068539426

说明这期间有一次生产,有一次消费,而此时我们的条件队列应该是 3 - 1 - 2,同步队列只剩下 0。所以此时只剩下 0 可以抢锁了,于是 0 拿到了锁,然后生产出了一条消息,这个可以说得通,此时生产完之后,它调用 consumer_condition.signalall(),把条件队列中的 3 个都给唤醒了,3 号小老弟由于是在条件队列队首,所以它第一个从条件队列中加入到同步队列,1 号 和 2 号 老大哥紧随其后加入到同步队列,于是他第一个抢到锁,继续执行自己的逻辑,注意,我们终于要解决第二个问题了!!!!他发现此时有消息可以消费,于是他进行了消费,此时已经没有消息可以消费了,但是 1号老大哥不知道啊,它刚刚被唤醒,兴高采烈的以为终于可以消费了,因为我们写的是 if,所以它不会再去判断是否还要消息可以消费,因为它以为肯定有消息啊对吧,不然你唤醒我干嘛!于是他去消费,poll()一个空list,这不是找报错嘛…所以,就真的报错了。

所以说,遇到问题不要慌,理论掌握的透透的,不会出现意外情况的,我们来总结一下为什么要用 while 而不能用 if:

  • 被唤醒之后,条件队列中的 Thread 会加入到 同步队列中,如果此时同步队列中还有 Thread,那么条件队列中被唤醒的 Thread 必须跟在它们的后面,如果此时同步队列中的 Thread 把消息消费完了,我后面的刚才条件队列中被唤醒的 Thread,根本无法知道没有消息了,因为它们一拿到 lock 就会继续往下执行;
  • 当然了,刚才是两批人的战斗,条件队列算一批人,同步队列算一批人,同步队列的把人家条件队列的人给害了还不告诉人家,恶心!当然最恶心的还是自家人的斗争,也就是条件队列中的 Thread 的自相残杀,条件队列后面的压根不知道前面的消费完了,所以也会报错。

因此,将 if 改为 while,我在被唤醒之后依旧可以首先判断一下是否还有消息,这是非常有必要的。

Q3:消费者-生产者使用两把锁,又如何完成呢?

的确,消费者和生产者本就不应该放到一把锁中去控制,我消费我的,你生产你的,只是各有一种情况停止而已,所以使用两把锁才是非常合理的设计,我之前也有说,这就是 ArrayBlockingQueue 和 LinkedBlockingQueue 最大的区别。所以,接下来这个问题,我希望能借助于 LinkedBlockingQueue 的源码来解决这个问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package 多线程.Producer_Consumer;


import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* 我有三个问题:
* 1. 为何每次都是 producer 和 consumer 连着一起?是 jvm 内置偏向锁的原因吗?
* 2. 为何 condition.await() 那块用 if 会报错,但是用 while 就可以?
* 3. 按道理来说,consumer 和 producer 不应该有资源竞争关系,所以合理来说应该是两把锁,如果 consumer 和 producer 分两把锁来操作又如何完成通信呢?
*
*/
public class await_signal {
private final static Lock lock1 = new ReentrantLock();
private final static Lock lock2 = new ReentrantLock();
private final static Condition producer_condition = lock1.newCondition();
private final static Condition consumer_condition = lock2.newCondition();
private final static LinkedList<Long> linkedList = new LinkedList<>();
private final static int MAX_CAPACITY = 5;

public static void main(String[] args) {
System.out.println("producer-consumer 两把锁");

for (int i = 0; i < 3; i++) {
new Thread(() -> {
while (true) {
await_signal.producer();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
System.out.println("-----------------");

for (int i = 0; i < 3; i++) {
new Thread(() -> {
while (true) {
await_signal.consumer();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}

}

static void producer() {
int size = 0;
try {
lock1.lock();
while (linkedList.size() == MAX_CAPACITY) {
producer_condition.await();
}
size = linkedList.size();
long value = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ":-------PRODUCER-------- " + value);
linkedList.addLast(value);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock1.unlock();
}
if(size == 0){
try{
lock2.lock();
consumer_condition.signalAll();
}finally {
lock2.unlock();
}
}

}

static void consumer() {
int size = 0;
try {
lock2.lock();
while (linkedList.size() == 0) {
consumer_condition.await();
}
size = linkedList.size();
long value = linkedList.pop();
System.out.println(Thread.currentThread().getName() + "::-------CONSUMER-------- " + value);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock2.unlock();
}
if(size == MAX_CAPACITY){
try{
lock1.lock();
producer_condition.signalAll();
}finally {
lock1.unlock();
}
}
}
}

上述就是完全借鉴于 LinkedBlockingQueue,但是源码里面根本没有调用 signalAll(),而是利用两把锁的优势,只需要唤醒条件队列的第一个,然后一个接一个去唤醒就可以了,这个就很妙!!!!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package 多线程.Producer_Consumer;


import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* 我有三个问题:
* 1. 为何每次都是 producer 和 consumer 连着一起?是 jvm 内置偏向锁的原因吗?
* 2. 为何 condition.await() 那块用 if 会报错,但是用 while 就可以?
* 3. 按道理来说,consumer 和 producer 不应该有资源竞争关系,所以合理来说应该是两把锁,如果 consumer 和 producer 分两把锁来操作又如何完成通信呢?
*
*/
public class await_signal {
private final static Lock lock1 = new ReentrantLock();
private final static Lock lock2 = new ReentrantLock();
private final static Condition producer_condition = lock1.newCondition();
private final static Condition consumer_condition = lock2.newCondition();
private final static LinkedList<Long> linkedList = new LinkedList<>();
private final static int MAX_CAPACITY = 5;

public static void main(String[] args) {
System.out.println("producer-consumer 两把锁");

for (int i = 0; i < 3; i++) {
new Thread(() -> {
while (true) {
await_signal.producer();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
System.out.println("-----------------");

for (int i = 0; i < 3; i++) {
new Thread(() -> {
while (true) {
await_signal.consumer();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}

}

static void producer() {
int size = 0;
try {
lock1.lock();
while (linkedList.size() == MAX_CAPACITY) {
producer_condition.await();
}
size = linkedList.size();
long value = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ":-------PRODUCER-------- " + value);
linkedList.addLast(value);
producer_condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock1.unlock();
}
if(size == 0){
try{
lock2.lock();
consumer_condition.signal();
}finally {
lock2.unlock();
}
}

}

static void consumer() {
int size = 0;
try {
lock2.lock();
while (linkedList.size() == 0) {
consumer_condition.await();
}
size = linkedList.size();
long value = linkedList.pop();
System.out.println(Thread.currentThread().getName() + "::-------CONSUMER-------- " + value);
consumer_condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock2.unlock();
}
if(size == MAX_CAPACITY){
try{
lock1.lock();
producer_condition.signal();
}finally {
lock1.unlock();
}
}
}
}

感慨一句,源码写的真的精妙啊!读源码真的让自己学到了很多,就是有点可惜现在还不太懂设计模式,导致很多很好的思想还没有学会。。。

3. BlockingQueue 实现

别人写的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package 多线程.Producer_Consumer;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class BlockingQueue_product_consumer {
}
class ProductorConsumer {
private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(15);
for (int i = 0; i < 5; i++) {
service.submit(new Producer(queue));
}
for (int i = 0; i < 10; i++) {
service.submit(new Consumer(queue));
}
}


static class Producer implements Runnable {

private BlockingQueue queue;

public Producer(BlockingQueue queue) {
this.queue = queue;
}

@Override
public void run() {
try {
while (true) {
Random random = new Random();
int i = random.nextInt();
System.out.println("生产者" + Thread.currentThread().getName() + "生产数据" + i);
queue.put(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

static class Consumer implements Runnable {
private BlockingQueue queue;

public Consumer(BlockingQueue queue) {
this.queue = queue;
}

@Override
public void run() {
try {
while (true) {
Integer element = (Integer) queue.take();
System.out.println("消费者" + Thread.currentThread().getName() + "正在消费数据" + element);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

}

自己无聊重写一个吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class BlockingQueue_product_consumer2 {
private static BlockingQueue<Long> blockingQueue = new LinkedBlockingQueue(5);

public static void main(String[] args) {
System.out.println("BlockingQueue api 使用");

ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 5; i++) {
executorService.execute(() -> {
while (true) {
long value = 0;
try {
value = blockingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "::-------CONSUMER-------- " + value);
}

});
}
for (int i = 0; i < 5; i++) {
executorService.execute(() -> {
while (true) {
long value = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + "::-------PRODUCER-------- " + value);
try {
blockingQueue.put(value);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
}
}
}
Thank you for your accept. mua!
-------------本文结束感谢您的阅读-------------